#!/usr/bin/env node

// Copyright 2015-2016 RethinkDB, all rights reserved.

////
// Tests for feeds on the JavaScript driver
/////

var assert = require('assert');
var path = require('path');

// -- settings

var driverPort  = process.env.RDB_DRIVER_PORT || (process.argv[2] ? parseInt(process.argv[2], 10) : 28015);
var serverHost  = process.env.RDB_SERVER_HOST || process.argv[3] || 'localhost';

var dbName      = process.env.RDB_DB_NAME || 'test';
var tableName   = process.env.RDB_TABLE_NAME || 'test';

// -- load rethinkdb from the proper location

var r = require(path.resolve(__dirname, '..', 'importRethinkDB.js')).r;
var Promise = r._bluebird;

// -- globals

var tbl = r.db(dbName).table(tableName);
var reqlConn = null;
var tableFeed = null;

var idValue = 1;

// -- helper functions

var withConnection = function(fnct) {
    // ensure that the shared connection 'reqlConn' is valid
    if (fnct) {
        // nested function style
        return function(done) {
            r.expr(1).run(reqlConn, function(err) {
                if(err) {
                    reqlConn = null;
                    r.connect({host:serverHost, port:driverPort}, function(err, conn) {
                        if(err) { done(err) }
                        reqlConn = conn;
                        return fnct(done, reqlConn);
                    })
                } else {
                    return fnct(done, reqlConn);
                }
            });
        };
    } else {
        // promises style
        return r.expr(1).run(reqlConn) // check the connection
        .then(function() {
            return reqlConn;
        })
        // re-establish the connection if it was bad
        .catch(r.Error.ReqlDriverError, r.Error.ReqlRuntimeError, function(err) {
                reqlConn = null;
                return r.connect({host:serverHost, port:driverPort})
                .then(function(conn) {
                // cache the new connection
                reqlConn = conn;
                return reqlConn;
                });
        });
    }
}

var withTable = function() {    
    // ensure db exists
    return withConnection()
    .then(function() {
        return r.expr([dbName]).setDifference(r.dbList()).forEach(
            function(value) { return r.dbCreate(value); }
        ).run(reqlConn)
    })
    // ensure table exists
    .then(function() {
        return r.expr([tableName]).setDifference(r.db(dbName).tableList()).forEach(
            function(value) { return r.db(dbName).tableCreate(value); }
        ).run(reqlConn);
    })
    // make sure the table is ready
    .then(function() {
        return r.db(dbName).table(tableName).wait({"waitFor":"all_replicas_ready"}).run(reqlConn)
    })
}

// ensure both tbl and tableFeed are valid
var withTableFeed = function() {
    return withTable()
    // dispose of any old tableFeed
    .then(function() {
        if (tableFeed) {
            try {
                tableFeed.removeAllListeners();
                tableFeed.on('error', function() {}); // otherwise `close` becomes an unhandled error 
            } catch (err) {}
            return tableFeed.close();
        }
    })
    // provide a clean tableFeed
    .then(function() {
        return tbl.changes().run(reqlConn);
    }).then(function(newFeed) {
        tableFeed = newFeed;
    });
}

// delete all items in tbl
var cleanTable = function() {
    return withTable()
    .then(function() {
        return tbl.delete().run(reqlConn);
    });
}

// -- tests

describe('JavaScript changefeed', function() {
    this.timeout(5000); // Bump timeout from the default 2000ms because some operations
                        // (like table creation) may take a moment

    before(function() {
        this.timeout(10000)
        return cleanTable(); // clean before the tests, some tests require continuity
    })

    // ensure validity of reqlConn, tbl, and tableFeed
    beforeEach(function () {
        this.timeout(10000)
        return withTableFeed();
    })

    describe('on a table', function() {

        it('generates a change when inserting a document', function() {
            return tbl.insert({'id':idValue, value:'insert'}).run(reqlConn)
            .then(function(value) {
                // ensure the insert worked
                assert.equal(value.errors, 0);
                assert.equal(value.inserted, 1);
                return tableFeed.next();
            })
            .then(function(value) {
                assert.deepEqual(value, {
                    'old_val':null,
                    'new_val':{'id':idValue, 'value':'insert'}
                });
            });
        });

        it('generates a change when updating a document', function() {
            return tbl.get(idValue).update({value: 'update'}).run(reqlConn)
            .then(function(value) {
                // ensure the update worked
                assert.equal(value.errors, 0);
                assert.equal(value.replaced, 1);
                return tableFeed.next();
            })
            .then(function(value) {
                assert.deepEqual(value, {
                    'old_val':{'id':idValue, 'value':'insert'},
                    'new_val':{'id':idValue, 'value':'update'}
                });
            });
        });

        it('generates a change when replacing a document', function() {
            return tbl.get(idValue).replace({id: idValue, value: 'replace'}).run(reqlConn)
            .then(function(value) {
                // ensure the replace worked
                assert.equal(value.errors, 0);
                assert.equal(value.replaced, 1);
                return tableFeed.next();
            })
            .then(function(value) {
                assert.deepEqual(value, {
                    'old_val':{'id':idValue, 'value':'update'},
                    'new_val':{'id':idValue, 'value':'replace'}
                });
            });
        });

        it('generates a change when deleting a document', function() {
            return tbl.get(idValue).delete().run(reqlConn)
            .then(function(value) {
                // ensure the replace worked
                assert.equal(value.errors, 0);
                assert.equal(value.deleted, 1);
                return tableFeed.next();
            })
            .then(function(value) {
                assert.deepEqual(value, {
                    'old_val':{'id':idValue, 'value':'replace'},
                    'new_val':null
                });
            });
        });

        describe('each', function() {
            it('yields change events correctly', function(done) {
                var numItems = 3;
                r.range(0, numItems).forEach(function(counter) { return tbl.insert({'fromEach':true}) }).run(reqlConn)
                .then(function(value) {
                    assert.equal(value.errors, 0);
                    assert.equal(value.inserted, numItems);

                    var counter = 0;
                    tableFeed.each(function(err, row) {
                        try {
                            assert.ifError(err);
                            assert.equal(row.new_val.fromEach, true);

                            counter += 1;
                            if(counter >= numItems) {
                                done();
                                return false; // break out of the loop
                            } else {
                                return true;
                            }
                        } catch(err) {
                            done(err);
                            return false;
                        }
                    });
                });
            });

            // Deactivated, waiting on https://github.com/rethinkdb/rethinkdb/issues/4421
            /*it('errors with an onFinishedCallback', function() {
                return tbl.changes().run(reqlConn)
                .then(function(tableFeed) {
                    return tableFeed.each(function() {}, function() {});
                })
                .then(function() {
                    throw new Error("Expected error on each with an onFinishedCallback was not thrown");
                })
                .catch(r.Error.ReqlRuntimeError, function(err) {
                    return true;
                });
            });*/
        });

        it('errors on hasNext', function() {
            return tbl.changes().run(reqlConn)
            .then(function(tableFeed) {
                tableFeed.hasNext();
            })
            .then(function() {
                throw new Error("Expected error on next was not thrown");
            })
            .catch(r.Error.ReqlDriverError, function() {})
        });

        it('errors on toArray', function() {
            return tbl.changes().run(reqlConn)
            .then(function(tableFeed) {
                tableFeed.toArray();
            })
            .then(function() {
                throw new Error("Expected error on toArray was not thrown");
            })
            .catch(r.Error.ReqlDriverError, function() {})
        });

        it('errors on next after close', function() {
            return tbl.changes().run(reqlConn)
            .then(function() {
                tableFeed.close()
            })
            .then(function() {
                return tableFeed.next();
            })
            .then(function() {
                assert.fail(undefined, undefined, "The expected error when a closed cursor is used was not thrown")
            })
            .catch(r.Error.ReqlDriverError, function(err) {
                assert.equal(err.message, 'Cursor is closed.')
            });
        });

        it('errors on dropped table', function() {
            var tableFeed = null;
            return tbl.changes().run(reqlConn)
            .then(function(newFeed) {
                tableFeed = newFeed;
            })
            .then(function() {
                return r.db(dbName).tableDrop(tableName).run(reqlConn)
            })
            .then(function() {
                return tableFeed.next();
            })
            .then(function() {
                throw new Error("Expected error when table was dropped was not thrown");
            })
            .catch(r.Error.ReqlRuntimeError, function() {})
        });
    });

    describe('EventEmitter interface', function() {
        var shortTimeout = 500;
        var checkDelay = 5

        // ensure we have a clean reqlConn for the next test
        afterEach(function() { reqlConn.removeAllListeners; reqlConn = null; });

        it('allows adding listener', function() {
            var listener = function() {};
            tableFeed.addListener('data', listener);
            assert.deepEqual(tableFeed.listeners('data'), [listener])
        });

        it('emits an event when data is inserted', function(done) {
            var eventCounter = 0;
            var expectedEvents = 3;
            var deadline = Date.now() + shortTimeout;

            // set the event receiver
            tableFeed.on('data', function() { eventCounter += 1; });

            // do the inserts
            r.range(0, expectedEvents).forEach(function() { return tbl.insert({'test':'data on'}); }).run(reqlConn);

            // check on the result periodically, up to a deadline
            var checkEvents = function() {
                if(eventCounter == expectedEvents) {
                    done();
                } else if (Date.now() > deadline) {
                    err = new Error('Expected ' + expectedEvents + ' events but only saw ' + eventCounter);
                    done(err);
                } else {
                    setTimeout(checkEvents, checkDelay);
                }
            }
            checkEvents();
        });

        it('emits an event to two receivers when data is inserted', function(done) {
            var eventCounterA = 0;
            var eventCounterB = 0;
            var expectedEvents = 3;
            var deadline = Date.now() + shortTimeout;

            // set the event receivers
            tableFeed.on('data', function() { eventCounterA += 1; });
            tableFeed.on('data', function() { eventCounterB += 1; });

            // do the inserts
            r.range(0, expectedEvents).forEach(function() { return tbl.insert({'test':'emits an event to two receivers when data is inserted'}); }).run(reqlConn);
            
            // check on the result periodically, up to a deadline
            var checkEvents = function() {
                if(eventCounterA == expectedEvents) {
                     done();
                } else if (Date.now() > deadline) {
                    err = new Error('Expected ' + expectedEvents + ' events but saw ' + eventCounterA + '/' + eventCounterB);
                    done(err);
                } else {
                    setTimeout(checkEvents, checkDelay);
                }
            }
            checkEvents();
        });

        it('emits error events', function() {
            var eventCounter = 0;
            var expectedEvents = 1;
            var deadline = Date.now() + shortTimeout;

            // set the event receiver
            tableFeed.on('error', function(error) { eventCounter += 1; });

            // drop the table to generate an error in the feed
            return r.db(dbName).tableDrop(tableName).run(reqlConn)
            // wait shortTimeout then check that we have only the single event
            .delay(shortTimeout)
            .then(function() {
                assert.equal(eventCounter, 1, 'Did not see the single event that was expected, rather saw: ' + eventCounter);
            });
        });

        it('supports removeListener', function() {
            var listener = function() {};

            // add a listener twice
            tableFeed.addListener('data', listener).addListener('data', listener);
            assert.deepEqual(tableFeed.listeners('data'), [listener, listener])

            // remove the first
            tableFeed.removeListener('data', listener);
            assert.deepEqual(tableFeed.listeners('data'), [listener])

            // remove the second
            tableFeed.removeListener('data', listener);
            assert.deepEqual(tableFeed.listeners('data'), [])
        });

        it('supports removeAllListeners', function() {
            var listener = function() {};

            // add a listener twice
            tableFeed.addListener('data', listener).addListener('data', listener);
            assert.deepEqual(tableFeed.listeners('data'), [listener, listener])

            // remove them
            tableFeed.removeAllListeners('data');
            assert.deepEqual(tableFeed.listeners('data'), [])
        });

        it('supports setMaxListeners', function() {
            // just checking for no errors, since only a warning is printed if we exceed it
            tableFeed.setMaxListeners(40);
        });

        it('supports defaultMaxListeners', function() {
            // just checking it is there, since only a warning is printed if we exceed it
            tableFeed.defaultMaxListeners;
        });

        it('errors on cursor + EventEmitter interface', function(done) {
            // enable EventEmitter interface
            tableFeed.on('data', function() {});

            assert.throws(
                function() { tableFeed.each(); },
                function(err) {
                    return err.message === "You cannot use the cursor interface and the EventEmitter interface at the same time."
                }
            );

            assert.throws(
                function() { tableFeed.next(); },
                function(err) {
                    return err.message === "You cannot use the cursor interface and the EventEmitter interface at the same time."
                }
            );
            done();
        });

        it("doesn't emit events after close", function() {
            var eventCounter = 0;
            
            // set the event receiver
            tableFeed.on('data', function() { eventCounter += 1; });
            
            // add one data item
            return tbl.insert({}).run(reqlConn)
            // close the cursor
            .then(function() {
                tableFeed.close()
            })
            // insert an item
            .then(function() {
                return tbl.insert({}).run(reqlConn);
            })
            // wait shortTimeout, and then make sure we only saw one event
            .delay(shortTimeout)
            .then(function() {
                assert.equal(eventCounter, 1, 'Did not see the single event that was expected, rather saw: ' + eventCounter);
            })
        });
        
        describe('once', function() {
            it('emits only one event', function() {
                var eventCounter = 0;

                // set the event receiver
                tableFeed.once('data', function(row) {
                    if(row.new_val.test == 'data once') {
                        eventCounter += 1;
                    }
                });

                // insert 2 events
                return tbl.insert([{'test':'data once'}, {'test':'data once'}]).run(reqlConn)
                // wait shortTimeout then check that we have only the single event
                .delay(shortTimeout)
                .then(function() {
                    assert.equal(eventCounter, 1, 'Did not see the single event that was expected, rather saw: ' + eventCounter);
                    assert.equal(tableFeed.listeners('data').length, 0, 'Unexpected listeners: ' + tableFeed.listeners('data'));
                });
            });
        });
    });
});
